package com.huaweicloud.flume;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {

    private ArrayList<Event> events = new ArrayList<>();

    @Override
    public void initialize() {

    }

/*
    hdfs.useLocalTimeStamp	false	(非常重要,使用服务器本地的时间)
    如果不使用本地时间，要求flume发送的事件header中带有时间戳(key为 timestamp )。（使用flume发送过来的时间）
    该时间用于替换逃逸字符
    */



//event    header   map<String,String>   body   json字符串
    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        String timestamp ="";

        if(jsonObject.containsKey("ts")) {
            timestamp=jsonObject.getString("ts");
        }
        else {
            timestamp=System.currentTimeMillis()+"";
        }
        headers.put("timestamp", timestamp);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        events.clear();
        for (Event event : list) {
            events.add(intercept(event));
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}
